home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Language/OS - Multiplatform Resource Library
/
LANGUAGE OS.iso
/
p4
/
p4-1_2b.lha
/
p4-1.2b
/
lib
/
p4_bm.c
< prev
next >
Wrap
C/C++ Source or Header
|
1993-02-06
|
13KB
|
456 lines
#include "p4.h"
#include "p4_sys.h"
int bm_start(argc, argv)
int *argc;
char **argv;
{
int bm_switch_port;
sprintf(whoami_p4, "p0_%d", getpid());
p4_dprintfl(90,"entering bm_start\n");
trap_sig_errs(); /* Errors can happen any time */
logging_flag = FALSE;
globmemsize = GLOBMEMSIZE;
sserver_port = 753;
process_args(argc, argv);
# ifdef SYSV_IPC
sysv_num_shmids = 0;
sysv_shmid[0] = -1;
sysv_semid0 = init_sysv_semset(0);
# endif
MD_initmem(globmemsize);
alloc_global(); /* sets p4_global */
if (*bm_outfile)
{
freopen(bm_outfile, "w", stdout);
freopen(bm_outfile, "w", stderr);
}
p4_local = alloc_local_bm();
if (p4_local == NULL)
p4_error("p4_initenv: alloc_local_bm failed\n", NULL);
MD_initenv();
bm_switch_port = getswport(p4_global->my_host_name);
usc_init();
/* big master installing himself */
install_in_proctable(0, (-1), getpid(), p4_global->my_host_name,
0, P4_MACHINE_TYPE, bm_switch_port);
p4_local->my_id = 0;
if (logging_flag)
ALOG_ENABLE;
else
ALOG_DISABLE;
return (0);
}
int p4_create_procgroup()
{
p4_dprintfl(90,"entering p4_create_procgroup\n");
if (p4_local->my_id != 0)
return(0);
if ((p4_local->procgroup = read_procgroup()) == NULL)
return (-1);
p4_startup(p4_local->procgroup);
return(0);
}
int p4_startup(pg)
struct p4_procgroup *pg;
{
int i, nslaves, unused_flag;
int listener_port, listener_fd;
struct bm_rm_msg bm_msg;
struct p4_procgroup_entry *local_pg;
p4_dprintfl(90,"entering p4_startup\n");
if (p4_global == NULL)
p4_error("p4 not initialized; perhaps p4_initenv not called",0);
procgroup_to_proctable(pg);
if (pg->num_entries > 1)
p4_global->local_communication_only = FALSE;
# ifdef CAN_DO_SOCKET_MSGS
if (!p4_global->local_communication_only)
{
net_setup_anon_listener(10, &listener_port, &listener_fd);
p4_global->listener_port = listener_port;
p4_global->listener_fd = listener_fd;
p4_dprintfl(90, "setup listener on port %d fd %d\n",
listener_port, listener_fd);
p4_global->proctable[0].port = listener_port;
SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);
}
# endif
setup_conntab();
p4_lock(&p4_global->slave_lock);
if ((nslaves = create_bm_processes(pg)) < 0)
return (-1);
if (!p4_am_i_cluster_master()) /* I was forked in create_bm_processes */
return(0);
# ifdef CAN_DO_SOCKET_MSGS
if (create_remote_processes(pg) < 0)
return (-1);
# endif
/* let local slaves use proc table to identify themselves */
p4_unlock(&p4_global->slave_lock);
send_proc_table(); /* to remote masters */
# if defined(IPSC860) || defined(CM5) || defined(NCUBE)
/* send initial info and proctable to local slaves */
/* must use p4_i_to_n procs because node slave
does not know if the msg is forwarded from bm */
local_pg = &(pg->entries[0]);
bm_msg.type = p4_i_to_n(INITIAL_INFO);
bm_msg.numinproctab = p4_i_to_n(p4_global->num_in_proctable);
bm_msg.numslaves = p4_i_to_n(local_pg->numslaves_in_group);
bm_msg.debug_level = p4_i_to_n(remote_debug_level);
bm_msg.memsize = p4_i_to_n(globmemsize);
bm_msg.logging_flag = p4_i_to_n(logging_flag);
strcpy(bm_msg.application_id, p4_global->application_id);
strcpy(bm_msg.version, p4_version());
strcpy(bm_msg.pgm, local_pg->slave_full_pathname);
for (i = 1; i <= nslaves; i++)
{
p4_dprintfl(90,"sending initinfo to slave %d of %d\n",i,nslaves);
# if defined(IPSC860)
csend((long) INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg),
(long) i, (long) NODE_PID);
csend((long) INITIAL_INFO, p4_global->proctable,
(long) sizeof(p4_global->proctable), (long) i, (long) NODE_PID);
# endif
# if defined(CM5)
CMMD_send_noblock(i, INITIAL_INFO, &bm_msg, sizeof(struct bm_rm_msg));
CMMD_send_noblock(i, INITIAL_INFO, p4_global->proctable, sizeof(p4_global->proctable));
# endif
# if defined(NCUBE)
nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, INITIAL_INFO, &unused_flag);
nwrite(p4_global->proctable, sizeof(p4_global->proctable), i, INITIAL_INFO, &unused_flag);
# endif
p4_dprintfl(90,"sent initinfo to slave %d of %d\n",i,nslaves);
}
# endif
p4_global->low_cluster_id =
p4_local->my_id - p4_global->proctable[p4_local->my_id].slave_idx;
p4_global->hi_cluster_id =
p4_global->low_cluster_id + p4_global->local_slave_count + 1;
/*
sync with local slaves thus insuring that they have the proctable before
syncing with remotes (this keeps remotes from interrupting the local
processes too early; then re-sync with local slaves (thus permitting them
to interrupt remotes)
*/
p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
/*
NEED A SYNC WITH LOCALS THAT DOES A BARRIER WITH PROCS THAT SHARE
MEMORY AND MP BARRIER WITH OTHER "LOCAL" PROCESSES
*/
sync_with_remotes();
p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
return (0);
}
int create_bm_processes(pg)
struct p4_procgroup *pg;
{
struct p4_procgroup_entry *local_pg;
struct listener_data *l;
int i, nslaves, end_1, end_2;
int slave_pid, listener_pid;
int slave_idx, listener_fd;
int port, switch_port, from, type, unused_flag;
struct bm_rm_msg bm_msg;
p4_dprintfl(90,"entering create_bm_processes\n");
local_pg = &(pg->entries[0]);
nslaves = local_pg->numslaves_in_group;
# if !defined(IPSC860) && !defined(CM5) && !defined(NCUBE)
if (nslaves > P4_MAX_MSG_QUEUES)
p4_error("more slaves than msg queues \n", nslaves);
# endif
/* alloc listener local data since this proc will eventually become listener */
# ifdef CAN_DO_SOCKET_MSGS
if (!(p4_global->local_communication_only))
{
listener_fd = p4_global->listener_fd;
listener_info = alloc_listener_info();
l = listener_info;
get_pipe(&end_1, &end_2);
}
# endif
# ifdef TCMP
tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());
# endif
# if defined(IPSC860) || defined(CM5) || defined(NCUBE)
for (i = 1; i <= nslaves; i++)
{
p4_dprintfl(90,"doing initial sync with local slave %d\n",i);
# if defined(IPSC860)
csend((long) SYNC_MSG, &bm_msg, (long) sizeof(struct bm_rm_msg),
(long) i, (long) NODE_PID);
crecv(INITIAL_INFO, &bm_msg, (long) sizeof(struct bm_rm_msg));
# endif
# if defined(CM5)
CMMD_send_noblock(i, SYNC_MSG, &bm_msg, sizeof(struct bm_rm_msg));
CMMD_receive(CMMD_ANY_NODE, INITIAL_INFO, (void *) &bm_msg, sizeof(struct bm_rm_msg));
# endif
# if defined(NCUBE)
nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, SYNC_MSG, &unused_flag);
from = NCUBE_ANY_NODE;
type = INITIAL_INFO;
nread(&bm_msg, sizeof(struct bm_rm_msg), &from, &type, &unused_flag);
# endif
port = p4_n_to_i(bm_msg.port);
slave_idx = p4_n_to_i(bm_msg.slave_idx);
slave_pid = p4_n_to_i(bm_msg.slave_pid);
switch_port = p4_n_to_i(bm_msg.switch_port);
/* big master installing local slaves */
install_in_proctable(0, port, slave_pid, bm_msg.host_name,
slave_idx, P4_MACHINE_TYPE, switch_port);
p4_global->local_slave_count++;
}
# else
for (slave_idx = 1; slave_idx <= nslaves; slave_idx++)
{
p4_dprintfl(20, "creating local slave %d of %d\n",slave_idx,nslaves);
slave_pid = fork_p4();
if (slave_pid < 0)
p4_error("create_bm_processes fork", slave_pid);
else
if (slave_pid)
p4_dprintfl(10, "created local slave %d\n", slave_idx);
if (slave_pid == 0) /* At this point, we are the slave. */
{
sprintf(whoami_p4, "bm_slave_%d_%d", slave_idx, getpid());
p4_free(p4_local); /* Doesn't work for weird memory model. */
p4_local = alloc_local_slave();
# ifdef CAN_DO_SOCKET_MSGS
if (!(p4_global->local_communication_only))
{
p4_local->listener_fd = end_1;
close(end_2);
close(listener_fd);
}
SIGNAL_P4(LISTENER_ATTN_SIGNAL, handle_connection_interrupt);
# endif
/* hang for a valid proctable */
p4_lock(&p4_global->slave_lock);
p4_unlock(&p4_global->slave_lock);
p4_local->my_id = p4_get_my_id_from_proc();
setup_conntab();
sprintf(whoami_p4, "p%d_%d", p4_local->my_id, getpid());
usc_init();
# ifdef TCMP
tcmp_init(NULL,p4_get_my_cluster_id(),shmem_getclunid());
# endif
/*
sync with master twice: once to make sure all slaves have
got proctable, and second after the master has synced with the
remote processes
*/
p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
p4_barrier(&(p4_global->cluster_barrier),p4_num_cluster_ids());
p4_dprintfl(20, "local slave starting\n");
ALOG_SETUP(p4_local->my_id,ALOG_TRUNCATE);
ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
return;
}
/* master installing local slaves */
install_in_proctable(0, p4_global->listener_port, slave_pid,
p4_global->my_host_name,
slave_idx, P4_MACHINE_TYPE,
p4_global->proctable[0].switch_port);
p4_global->local_slave_count++;
}
# endif
# if defined(CM5)
for (i=nslaves+1; i < CMMD_partition_size(); i++)
CMMD_send_noblock(i, DIE, &bm_msg, sizeof(struct bm_rm_msg));
# endif
# if defined(NCUBE)
for (i=nslaves+1; i < ncubesize(); i++)
nwrite(&bm_msg, sizeof(struct bm_rm_msg), i, DIE, &unused_flag);
# endif
/* Done creating slaves. Now fork off the listener */
# if !defined(IPSC860) && !defined(CM5) && !defined(NCUBE)
# ifdef CAN_DO_SOCKET_MSGS
if (!(p4_global->local_communication_only))
{
listener_pid = fork_p4();
if (listener_pid < 0)
p4_error("create_bm_processes listener fork", listener_pid);
if (listener_pid == 0)
{
sprintf(whoami_p4, "bm_list_%d", getpid());
/* Inside listener */
p4_local = alloc_local_listener();
l->listening_fd = listener_fd;
l->slave_fd = end_2;
close(end_1);
listener();
exit(0);
}
}
# endif
/* Else we're still in the big master */
sprintf(whoami_p4, "p0_%d", getpid());
/* We need to close the fds from the listener setup */
# ifdef CAN_DO_SOCKET_MSGS
if (!(p4_global->local_communication_only))
{
p4_local->listener_fd = end_1;
close(listener_fd);
close(end_2);
p4_global->listener_pid = listener_pid;
}
# endif
# endif
dump_global(80);
p4_dprintfl(90, "create_bm_processes: exiting\n");
return (nslaves);
}
P4VOID procgroup_to_proctable(pg)
struct p4_procgroup *pg;
{
int i, j, ptidx;
struct p4_procgroup_entry *pe;
if (strcmp(pg->entries[0].host_name,"local") != 0)
p4_error("local is not first entry in procgroup ",0);
strcpy(p4_global->proctable[0].host_name,p4_global->my_host_name);
get_qualified_hostname(p4_global->proctable[0].host_name);
p4_global->proctable[0].group_id = 0;
ptidx = 1;
for (i=0, pe=pg->entries; i < pg->num_entries; i++, pe++)
{
for (j=0; j < pe->numslaves_in_group; j++)
{
if (i == 0)
strcpy(p4_global->proctable[ptidx].host_name,p4_global->my_host_name);
else
strcpy(p4_global->proctable[ptidx].host_name,pe->host_name);
get_qualified_hostname(p4_global->proctable[ptidx].host_name);
p4_global->proctable[ptidx].group_id = i;
ptidx++;
}
p4_global->num_in_proctable = ptidx;
}
}
P4VOID sync_with_remotes()
{
struct bm_rm_msg msg;
int i, fd, node, num_rms, rm[P4_MAXPROCS];
p4_dprintfl(90, "sync_with_remotes: starting\n");
# ifdef CAN_DO_SOCKET_MSGS
p4_get_cluster_masters(&num_rms, rm);
for (i = 1; i < num_rms; i++)
{
node = rm[i];
fd = p4_local->conntab[node].port;
net_recv(fd, &msg, sizeof(msg));
msg.type = p4_n_to_i(msg.type);
if (msg.type != SYNC_MSG)
p4_error("sync_with_remotes: bad type rcvd\n",msg.type);
}
for (i = 1; i < num_rms; i++)
{
node = rm[i];
fd = p4_local->conntab[node].port;
msg.type = p4_i_to_n(SYNC_MSG);
net_send(fd, &msg, sizeof(msg), FALSE);
}
# endif
}
P4VOID send_proc_table()
{
int slave_idx, ent;
int fd;
struct bm_rm_msg msg;
struct proc_info *pe;
p4_dprintfl(90, "send_proc_table: starting\n");
# ifdef CAN_DO_SOCKET_MSGS
for (slave_idx = 1; slave_idx < p4_global->num_in_proctable; slave_idx++)
{
if (p4_global->proctable[slave_idx].slave_idx != 0)
continue;
fd = p4_local->conntab[slave_idx].port;
p4_dprintfl(90, "sending proctable to slave %d on %d:\n", slave_idx, fd);
if (fd < 0)
p4_error("send_proc_table: rm entry doesn't have valid fd", fd);
for (ent = 0, pe = p4_global->proctable;
ent < p4_global->num_in_proctable; ent++, pe++)
{
msg.type = p4_i_to_n(PROC_TABLE_ENTRY);
msg.port = p4_i_to_n(pe->port);
msg.unix_id = p4_i_to_n(pe->unix_id);
msg.slave_idx = p4_i_to_n(pe->slave_idx);
msg.group_id = p4_i_to_n(pe->group_id);
strcpy(msg.host_name, pe->host_name);
strcpy(msg.machine_type,pe->machine_type);
msg.switch_port = p4_i_to_n(pe->switch_port);
net_send(fd, &msg, sizeof(msg), FALSE);
}
p4_dprintfl(90, " sending end_of_proc_table\n");
msg.type = p4_i_to_n(PROC_TABLE_END);
net_send(fd, &msg, sizeof(msg), FALSE);
}
# endif
}